1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.operators;
17
18 import static org.junit.Assert.assertArrayEquals;
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertNotNull;
21 import static org.junit.Assert.assertTrue;
22 import static org.mockito.Matchers.any;
23 import static org.mockito.Matchers.anyInt;
24 import static org.mockito.Mockito.mock;
25 import static org.mockito.Mockito.never;
26 import static org.mockito.Mockito.verify;
27
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicInteger;
38 import java.util.concurrent.atomic.AtomicReference;
39
40 import org.junit.Before;
41 import org.junit.Test;
42 import org.mockito.Matchers;
43 import org.mockito.MockitoAnnotations;
44
45 import rx.Notification;
46 import rx.Observable;
47 import rx.Observable.OnSubscribe;
48 import rx.Observer;
49 import rx.Subscriber;
50 import rx.Subscription;
51 import rx.exceptions.TestException;
52 import rx.functions.Action0;
53 import rx.functions.Action1;
54 import rx.functions.Func1;
55 import rx.internal.util.UtilityFunctions;
56 import rx.observables.GroupedObservable;
57 import rx.observers.TestSubscriber;
58 import rx.schedulers.Schedulers;
59
60 public class OperatorGroupByTest {
61
62 final Func1<String, Integer> length = new Func1<String, Integer>() {
63 @Override
64 public Integer call(String s) {
65 return s.length();
66 }
67 };
68
69 @Test
70 public void testGroupBy() {
71 Observable<String> source = Observable.just("one", "two", "three", "four", "five", "six");
72 Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<String, Integer, String>(length));
73
74 Map<Integer, Collection<String>> map = toMap(grouped);
75
76 assertEquals(3, map.size());
77 assertArrayEquals(Arrays.asList("one", "two", "six").toArray(), map.get(3).toArray());
78 assertArrayEquals(Arrays.asList("four", "five").toArray(), map.get(4).toArray());
79 assertArrayEquals(Arrays.asList("three").toArray(), map.get(5).toArray());
80 }
81
82 @Test
83 public void testGroupByWithElementSelector() {
84 Observable<String> source = Observable.just("one", "two", "three", "four", "five", "six");
85 Observable<GroupedObservable<Integer, Integer>> grouped = source.lift(new OperatorGroupBy<String, Integer, Integer>(length, length));
86
87 Map<Integer, Collection<Integer>> map = toMap(grouped);
88
89 assertEquals(3, map.size());
90 assertArrayEquals(Arrays.asList(3, 3, 3).toArray(), map.get(3).toArray());
91 assertArrayEquals(Arrays.asList(4, 4).toArray(), map.get(4).toArray());
92 assertArrayEquals(Arrays.asList(5).toArray(), map.get(5).toArray());
93 }
94
95 @Test
96 public void testGroupByWithElementSelector2() {
97 Observable<String> source = Observable.just("one", "two", "three", "four", "five", "six");
98 Observable<GroupedObservable<Integer, Integer>> grouped = source.groupBy(length, length);
99
100 Map<Integer, Collection<Integer>> map = toMap(grouped);
101
102 assertEquals(3, map.size());
103 assertArrayEquals(Arrays.asList(3, 3, 3).toArray(), map.get(3).toArray());
104 assertArrayEquals(Arrays.asList(4, 4).toArray(), map.get(4).toArray());
105 assertArrayEquals(Arrays.asList(5).toArray(), map.get(5).toArray());
106 }
107
108 @Test
109 public void testEmpty() {
110 Observable<String> source = Observable.empty();
111 Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<String, Integer, String>(length));
112
113 Map<Integer, Collection<String>> map = toMap(grouped);
114
115 assertTrue(map.isEmpty());
116 }
117
118 @Test
119 public void testError() {
120 Observable<String> sourceStrings = Observable.just("one", "two", "three", "four", "five", "six");
121 Observable<String> errorSource = Observable.error(new RuntimeException("forced failure"));
122 Observable<String> source = Observable.concat(sourceStrings, errorSource);
123
124 Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<String, Integer, String>(length));
125
126 final AtomicInteger groupCounter = new AtomicInteger();
127 final AtomicInteger eventCounter = new AtomicInteger();
128 final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
129
130 grouped.flatMap(new Func1<GroupedObservable<Integer, String>, Observable<String>>() {
131
132 @Override
133 public Observable<String> call(final GroupedObservable<Integer, String> o) {
134 groupCounter.incrementAndGet();
135 return o.map(new Func1<String, String>() {
136
137 @Override
138 public String call(String v) {
139 return "Event => key: " + o.getKey() + " value: " + v;
140 }
141 });
142 }
143 }).subscribe(new Subscriber<String>() {
144
145 @Override
146 public void onCompleted() {
147
148 }
149
150 @Override
151 public void onError(Throwable e) {
152 e.printStackTrace();
153 error.set(e);
154 }
155
156 @Override
157 public void onNext(String v) {
158 eventCounter.incrementAndGet();
159 System.out.println(v);
160
161 }
162 });
163
164 assertEquals(3, groupCounter.get());
165 assertEquals(6, eventCounter.get());
166 assertNotNull(error.get());
167 }
168
169 private static <K, V> Map<K, Collection<V>> toMap(Observable<GroupedObservable<K, V>> observable) {
170
171 final ConcurrentHashMap<K, Collection<V>> result = new ConcurrentHashMap<K, Collection<V>>();
172
173 observable.toBlocking().forEach(new Action1<GroupedObservable<K, V>>() {
174
175 @Override
176 public void call(final GroupedObservable<K, V> o) {
177 result.put(o.getKey(), new ConcurrentLinkedQueue<V>());
178 o.subscribe(new Action1<V>() {
179
180 @Override
181 public void call(V v) {
182 result.get(o.getKey()).add(v);
183 }
184
185 });
186 }
187 });
188
189 return result;
190 }
191
192
193
194
195
196
197 @Test
198 public void testGroupedEventStream() throws Throwable {
199
200 final AtomicInteger eventCounter = new AtomicInteger();
201 final AtomicInteger subscribeCounter = new AtomicInteger();
202 final AtomicInteger groupCounter = new AtomicInteger();
203 final CountDownLatch latch = new CountDownLatch(1);
204 final int count = 100;
205 final int groupCount = 2;
206
207 Observable<Event> es = Observable.create(new Observable.OnSubscribe<Event>() {
208
209 @Override
210 public void call(final Subscriber<? super Event> observer) {
211 System.out.println("*** Subscribing to EventStream ***");
212 subscribeCounter.incrementAndGet();
213 new Thread(new Runnable() {
214
215 @Override
216 public void run() {
217 for (int i = 0; i < count; i++) {
218 Event e = new Event();
219 e.source = i % groupCount;
220 e.message = "Event-" + i;
221 observer.onNext(e);
222 }
223 observer.onCompleted();
224 }
225
226 }).start();
227 }
228
229 });
230
231 es.groupBy(new Func1<Event, Integer>() {
232
233 @Override
234 public Integer call(Event e) {
235 return e.source;
236 }
237 }).flatMap(new Func1<GroupedObservable<Integer, Event>, Observable<String>>() {
238
239 @Override
240 public Observable<String> call(GroupedObservable<Integer, Event> eventGroupedObservable) {
241 System.out.println("GroupedObservable Key: " + eventGroupedObservable.getKey());
242 groupCounter.incrementAndGet();
243
244 return eventGroupedObservable.map(new Func1<Event, String>() {
245
246 @Override
247 public String call(Event event) {
248 return "Source: " + event.source + " Message: " + event.message;
249 }
250 });
251
252 }
253 }).subscribe(new Subscriber<String>() {
254
255 @Override
256 public void onCompleted() {
257 latch.countDown();
258 }
259
260 @Override
261 public void onError(Throwable e) {
262 e.printStackTrace();
263 latch.countDown();
264 }
265
266 @Override
267 public void onNext(String outputMessage) {
268 System.out.println(outputMessage);
269 eventCounter.incrementAndGet();
270 }
271 });
272
273 latch.await(5000, TimeUnit.MILLISECONDS);
274 assertEquals(1, subscribeCounter.get());
275 assertEquals(groupCount, groupCounter.get());
276 assertEquals(count, eventCounter.get());
277
278 }
279
280
281
282
283 @Test
284 public void testUnsubscribeOnNestedTakeAndSyncInfiniteStream() throws InterruptedException {
285 final AtomicInteger subscribeCounter = new AtomicInteger();
286 final AtomicInteger sentEventCounter = new AtomicInteger();
287 doTestUnsubscribeOnNestedTakeAndAsyncInfiniteStream(SYNC_INFINITE_OBSERVABLE_OF_EVENT(2, subscribeCounter, sentEventCounter), subscribeCounter);
288 Thread.sleep(500);
289 assertEquals(39, sentEventCounter.get());
290 }
291
292
293
294
295 @Test
296 public void testUnsubscribeOnNestedTakeAndAsyncInfiniteStream() throws InterruptedException {
297 final AtomicInteger subscribeCounter = new AtomicInteger();
298 final AtomicInteger sentEventCounter = new AtomicInteger();
299 doTestUnsubscribeOnNestedTakeAndAsyncInfiniteStream(ASYNC_INFINITE_OBSERVABLE_OF_EVENT(2, subscribeCounter, sentEventCounter), subscribeCounter);
300 Thread.sleep(500);
301 assertEquals(39, sentEventCounter.get());
302 }
303
304 private void doTestUnsubscribeOnNestedTakeAndAsyncInfiniteStream(Observable<Event> es, AtomicInteger subscribeCounter) throws InterruptedException {
305 final AtomicInteger eventCounter = new AtomicInteger();
306 final AtomicInteger groupCounter = new AtomicInteger();
307 final CountDownLatch latch = new CountDownLatch(1);
308
309 es.groupBy(new Func1<Event, Integer>() {
310
311 @Override
312 public Integer call(Event e) {
313 return e.source;
314 }
315 })
316 .take(1)
317 .flatMap(new Func1<GroupedObservable<Integer, Event>, Observable<String>>() {
318
319 @Override
320 public Observable<String> call(GroupedObservable<Integer, Event> eventGroupedObservable) {
321 System.out.println("testUnsubscribe => GroupedObservable Key: " + eventGroupedObservable.getKey());
322 groupCounter.incrementAndGet();
323
324 return eventGroupedObservable
325 .take(20)
326 .map(new Func1<Event, String>() {
327
328 @Override
329 public String call(Event event) {
330 return "testUnsubscribe => Source: " + event.source + " Message: " + event.message;
331 }
332 });
333
334 }
335 }).subscribe(new Subscriber<String>() {
336
337 @Override
338 public void onCompleted() {
339 latch.countDown();
340 }
341
342 @Override
343 public void onError(Throwable e) {
344 e.printStackTrace();
345 latch.countDown();
346 }
347
348 @Override
349 public void onNext(String outputMessage) {
350 System.out.println(outputMessage);
351 eventCounter.incrementAndGet();
352 }
353 });
354
355 if (!latch.await(2000, TimeUnit.MILLISECONDS)) {
356 fail("timed out so likely did not unsubscribe correctly");
357 }
358 assertEquals(1, subscribeCounter.get());
359 assertEquals(1, groupCounter.get());
360 assertEquals(20, eventCounter.get());
361
362
363
364 }
365
366 @Test
367 public void testUnsubscribeViaTakeOnGroupThenMergeAndTake() {
368 final AtomicInteger subscribeCounter = new AtomicInteger();
369 final AtomicInteger sentEventCounter = new AtomicInteger();
370 final AtomicInteger eventCounter = new AtomicInteger();
371
372 SYNC_INFINITE_OBSERVABLE_OF_EVENT(4, subscribeCounter, sentEventCounter)
373 .groupBy(new Func1<Event, Integer>() {
374
375 @Override
376 public Integer call(Event e) {
377 return e.source;
378 }
379 })
380
381 .take(2)
382 .flatMap(new Func1<GroupedObservable<Integer, Event>, Observable<String>>() {
383
384 @Override
385 public Observable<String> call(GroupedObservable<Integer, Event> eventGroupedObservable) {
386 return eventGroupedObservable
387 .map(new Func1<Event, String>() {
388
389 @Override
390 public String call(Event event) {
391 return "testUnsubscribe => Source: " + event.source + " Message: " + event.message;
392 }
393 });
394
395 }
396 })
397 .take(30).subscribe(new Action1<String>() {
398
399 @Override
400 public void call(String s) {
401 eventCounter.incrementAndGet();
402 System.out.println("=> " + s);
403 }
404
405 });
406
407 assertEquals(30, eventCounter.get());
408
409 assertEquals(58, sentEventCounter.get());
410 }
411
412 @Test
413 public void testUnsubscribeViaTakeOnGroupThenTakeOnInner() {
414 final AtomicInteger subscribeCounter = new AtomicInteger();
415 final AtomicInteger sentEventCounter = new AtomicInteger();
416 final AtomicInteger eventCounter = new AtomicInteger();
417
418 SYNC_INFINITE_OBSERVABLE_OF_EVENT(4, subscribeCounter, sentEventCounter)
419 .groupBy(new Func1<Event, Integer>() {
420
421 @Override
422 public Integer call(Event e) {
423 return e.source;
424 }
425 })
426
427 .take(2)
428 .flatMap(new Func1<GroupedObservable<Integer, Event>, Observable<String>>() {
429
430 @Override
431 public Observable<String> call(GroupedObservable<Integer, Event> eventGroupedObservable) {
432 int numToTake = 0;
433 if (eventGroupedObservable.getKey() == 1) {
434 numToTake = 10;
435 } else if (eventGroupedObservable.getKey() == 2) {
436 numToTake = 5;
437 }
438 return eventGroupedObservable
439 .take(numToTake)
440 .map(new Func1<Event, String>() {
441
442 @Override
443 public String call(Event event) {
444 return "testUnsubscribe => Source: " + event.source + " Message: " + event.message;
445 }
446 });
447
448 }
449 })
450 .subscribe(new Action1<String>() {
451
452 @Override
453 public void call(String s) {
454 eventCounter.incrementAndGet();
455 System.out.println("=> " + s);
456 }
457
458 });
459
460 assertEquals(15, eventCounter.get());
461
462 assertEquals(37, sentEventCounter.get());
463 }
464
465 @Test
466 public void testStaggeredCompletion() throws InterruptedException {
467 final AtomicInteger eventCounter = new AtomicInteger();
468 final CountDownLatch latch = new CountDownLatch(1);
469 Observable.range(0, 100)
470 .groupBy(new Func1<Integer, Integer>() {
471
472 @Override
473 public Integer call(Integer i) {
474 return i % 2;
475 }
476 })
477 .flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<Integer>>() {
478
479 @Override
480 public Observable<Integer> call(GroupedObservable<Integer, Integer> group) {
481 if (group.getKey() == 0) {
482 return group.delay(100, TimeUnit.MILLISECONDS).map(new Func1<Integer, Integer>() {
483 @Override
484 public Integer call(Integer t) {
485 return t * 10;
486 }
487
488 });
489 } else {
490 return group;
491 }
492 }
493 })
494 .subscribe(new Subscriber<Integer>() {
495
496 @Override
497 public void onCompleted() {
498 System.out.println("=> onCompleted");
499 latch.countDown();
500 }
501
502 @Override
503 public void onError(Throwable e) {
504 e.printStackTrace();
505 latch.countDown();
506 }
507
508 @Override
509 public void onNext(Integer s) {
510 eventCounter.incrementAndGet();
511 System.out.println("=> " + s);
512 }
513 });
514
515 if (!latch.await(3000, TimeUnit.MILLISECONDS)) {
516 fail("timed out");
517 }
518
519 assertEquals(100, eventCounter.get());
520 }
521
522 @Test(timeout = 1000)
523 public void testCompletionIfInnerNotSubscribed() throws InterruptedException {
524 final CountDownLatch latch = new CountDownLatch(1);
525 final AtomicInteger eventCounter = new AtomicInteger();
526 Observable.range(0, 100)
527 .groupBy(new Func1<Integer, Integer>() {
528
529 @Override
530 public Integer call(Integer i) {
531 return i % 2;
532 }
533 })
534 .subscribe(new Subscriber<GroupedObservable<Integer, Integer>>() {
535
536 @Override
537 public void onCompleted() {
538 latch.countDown();
539 }
540
541 @Override
542 public void onError(Throwable e) {
543 e.printStackTrace();
544 latch.countDown();
545 }
546
547 @Override
548 public void onNext(GroupedObservable<Integer, Integer> s) {
549 eventCounter.incrementAndGet();
550 System.out.println("=> " + s);
551 }
552 });
553 if (!latch.await(500, TimeUnit.MILLISECONDS)) {
554 fail("timed out - never got completion");
555 }
556 assertEquals(2, eventCounter.get());
557 }
558
559 @Test
560 public void testIgnoringGroups() {
561 final AtomicInteger subscribeCounter = new AtomicInteger();
562 final AtomicInteger sentEventCounter = new AtomicInteger();
563 final AtomicInteger eventCounter = new AtomicInteger();
564
565 SYNC_INFINITE_OBSERVABLE_OF_EVENT(4, subscribeCounter, sentEventCounter)
566 .groupBy(new Func1<Event, Integer>() {
567
568 @Override
569 public Integer call(Event e) {
570 return e.source;
571 }
572 })
573 .flatMap(new Func1<GroupedObservable<Integer, Event>, Observable<String>>() {
574
575 @Override
576 public Observable<String> call(GroupedObservable<Integer, Event> eventGroupedObservable) {
577 Observable<Event> eventStream = eventGroupedObservable;
578 if (eventGroupedObservable.getKey() >= 2) {
579
580 eventStream = eventGroupedObservable.filter(new Func1<Event, Boolean>() {
581
582 @Override
583 public Boolean call(Event t1) {
584 return false;
585 }
586
587 });
588 }
589
590 return eventStream
591 .map(new Func1<Event, String>() {
592
593 @Override
594 public String call(Event event) {
595 return "testUnsubscribe => Source: " + event.source + " Message: " + event.message;
596 }
597 });
598
599 }
600 })
601 .take(30).subscribe(new Action1<String>() {
602
603 @Override
604 public void call(String s) {
605 eventCounter.incrementAndGet();
606 System.out.println("=> " + s);
607 }
608
609 });
610
611 assertEquals(30, eventCounter.get());
612
613 assertEquals(60, sentEventCounter.get());
614 }
615
616 @Test
617 public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsAndThenComplete() throws InterruptedException {
618 final CountDownLatch first = new CountDownLatch(2);
619 final ArrayList<String> results = new ArrayList<String>();
620 Observable.create(new OnSubscribe<Integer>() {
621
622 @Override
623 public void call(Subscriber<? super Integer> sub) {
624 sub.onNext(1);
625 sub.onNext(2);
626 sub.onNext(1);
627 sub.onNext(2);
628 try {
629 first.await();
630 } catch (InterruptedException e) {
631 sub.onError(e);
632 return;
633 }
634 sub.onNext(3);
635 sub.onNext(3);
636 sub.onCompleted();
637 }
638
639 }).groupBy(new Func1<Integer, Integer>() {
640
641 @Override
642 public Integer call(Integer t) {
643 return t;
644 }
645
646 }).flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<String>>() {
647
648 @Override
649 public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
650 if (group.getKey() < 3) {
651 return group.map(new Func1<Integer, String>() {
652
653 @Override
654 public String call(Integer t1) {
655 return "first groups: " + t1;
656 }
657
658 })
659
660 .take(2).doOnCompleted(new Action0() {
661
662 @Override
663 public void call() {
664 first.countDown();
665 }
666
667 });
668 } else {
669 return group.map(new Func1<Integer, String>() {
670
671 @Override
672 public String call(Integer t1) {
673 return "last group: " + t1;
674 }
675
676 });
677 }
678 }
679
680 }).toBlocking().forEach(new Action1<String>() {
681
682 @Override
683 public void call(String s) {
684 results.add(s);
685 }
686
687 });
688
689 System.out.println("Results: " + results);
690 assertEquals(6, results.size());
691 }
692
693 @Test
694 public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenSubscribesOnAndDelaysAndThenCompletes() throws InterruptedException {
695 System.err.println("----------------------------------------------------------------------------------------------");
696 final CountDownLatch first = new CountDownLatch(2);
697 final ArrayList<String> results = new ArrayList<String>();
698 Observable.create(new OnSubscribe<Integer>() {
699
700 @Override
701 public void call(Subscriber<? super Integer> sub) {
702 sub.onNext(1);
703 sub.onNext(2);
704 sub.onNext(1);
705 sub.onNext(2);
706 try {
707 first.await();
708 } catch (InterruptedException e) {
709 sub.onError(e);
710 return;
711 }
712 sub.onNext(3);
713 sub.onNext(3);
714 sub.onCompleted();
715 }
716
717 }).groupBy(new Func1<Integer, Integer>() {
718
719 @Override
720 public Integer call(Integer t) {
721 return t;
722 }
723
724 }).flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<String>>() {
725
726 @Override
727 public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
728 if (group.getKey() < 3) {
729 return group.map(new Func1<Integer, String>() {
730
731 @Override
732 public String call(Integer t1) {
733 return "first groups: " + t1;
734 }
735
736 })
737
738 .take(2).doOnCompleted(new Action0() {
739
740 @Override
741 public void call() {
742 first.countDown();
743 }
744
745 });
746 } else {
747 return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
748
749 @Override
750 public String call(Integer t1) {
751 return "last group: " + t1;
752 }
753
754 }).doOnEach(new Action1<Notification<? super String>>() {
755
756 @Override
757 public void call(Notification<? super String> t1) {
758 System.err.println("subscribeOn notification => " + t1);
759 }
760
761 });
762 }
763 }
764
765 }).doOnEach(new Action1<Notification<? super String>>() {
766
767 @Override
768 public void call(Notification<? super String> t1) {
769 System.err.println("outer notification => " + t1);
770 }
771
772 }).toBlocking().forEach(new Action1<String>() {
773
774 @Override
775 public void call(String s) {
776 results.add(s);
777 }
778
779 });
780
781 System.out.println("Results: " + results);
782 assertEquals(6, results.size());
783 }
784
785 @Test
786 public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenObservesOnAndDelaysAndThenCompletes() throws InterruptedException {
787 final CountDownLatch first = new CountDownLatch(2);
788 final ArrayList<String> results = new ArrayList<String>();
789 Observable.create(new OnSubscribe<Integer>() {
790
791 @Override
792 public void call(Subscriber<? super Integer> sub) {
793 sub.onNext(1);
794 sub.onNext(2);
795 sub.onNext(1);
796 sub.onNext(2);
797 try {
798 first.await();
799 } catch (InterruptedException e) {
800 sub.onError(e);
801 return;
802 }
803 sub.onNext(3);
804 sub.onNext(3);
805 sub.onCompleted();
806 }
807
808 }).groupBy(new Func1<Integer, Integer>() {
809
810 @Override
811 public Integer call(Integer t) {
812 return t;
813 }
814
815 }).flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<String>>() {
816
817 @Override
818 public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
819 if (group.getKey() < 3) {
820 return group.map(new Func1<Integer, String>() {
821
822 @Override
823 public String call(Integer t1) {
824 return "first groups: " + t1;
825 }
826
827 })
828
829 .take(2).doOnCompleted(new Action0() {
830
831 @Override
832 public void call() {
833 first.countDown();
834 }
835
836 });
837 } else {
838 return group.observeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
839
840 @Override
841 public String call(Integer t1) {
842 return "last group: " + t1;
843 }
844
845 });
846 }
847 }
848
849 }).toBlocking().forEach(new Action1<String>() {
850
851 @Override
852 public void call(String s) {
853 results.add(s);
854 }
855
856 });
857
858 System.out.println("Results: " + results);
859 assertEquals(6, results.size());
860 }
861
862 @Test
863 public void testGroupsWithNestedSubscribeOn() throws InterruptedException {
864 final ArrayList<String> results = new ArrayList<String>();
865 Observable.create(new OnSubscribe<Integer>() {
866
867 @Override
868 public void call(Subscriber<? super Integer> sub) {
869 sub.onNext(1);
870 sub.onNext(2);
871 sub.onNext(1);
872 sub.onNext(2);
873 sub.onCompleted();
874 }
875
876 }).groupBy(new Func1<Integer, Integer>() {
877
878 @Override
879 public Integer call(Integer t) {
880 return t;
881 }
882
883 }).flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<String>>() {
884
885 @Override
886 public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
887 return group.subscribeOn(Schedulers.newThread()).map(new Func1<Integer, String>() {
888
889 @Override
890 public String call(Integer t1) {
891 System.out.println("Received: " + t1 + " on group : " + group.getKey());
892 return "first groups: " + t1;
893 }
894
895 });
896 }
897
898 }).doOnEach(new Action1<Notification<? super String>>() {
899
900 @Override
901 public void call(Notification<? super String> t1) {
902 System.out.println("notification => " + t1);
903 }
904
905 }).toBlocking().forEach(new Action1<String>() {
906
907 @Override
908 public void call(String s) {
909 results.add(s);
910 }
911
912 });
913
914 System.out.println("Results: " + results);
915 assertEquals(4, results.size());
916 }
917
918 @Test
919 public void testGroupsWithNestedObserveOn() throws InterruptedException {
920 final ArrayList<String> results = new ArrayList<String>();
921 Observable.create(new OnSubscribe<Integer>() {
922
923 @Override
924 public void call(Subscriber<? super Integer> sub) {
925 sub.onNext(1);
926 sub.onNext(2);
927 sub.onNext(1);
928 sub.onNext(2);
929 sub.onCompleted();
930 }
931
932 }).groupBy(new Func1<Integer, Integer>() {
933
934 @Override
935 public Integer call(Integer t) {
936 return t;
937 }
938
939 }).flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<String>>() {
940
941 @Override
942 public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
943 return group.observeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
944
945 @Override
946 public String call(Integer t1) {
947 return "first groups: " + t1;
948 }
949
950 });
951 }
952
953 }).toBlocking().forEach(new Action1<String>() {
954
955 @Override
956 public void call(String s) {
957 results.add(s);
958 }
959
960 });
961
962 System.out.println("Results: " + results);
963 assertEquals(4, results.size());
964 }
965
966 private static class Event {
967 int source;
968 String message;
969
970 @Override
971 public String toString() {
972 return "Event => source: " + source + " message: " + message;
973 }
974 }
975
976 Observable<Event> ASYNC_INFINITE_OBSERVABLE_OF_EVENT(final int numGroups, final AtomicInteger subscribeCounter, final AtomicInteger sentEventCounter) {
977 return SYNC_INFINITE_OBSERVABLE_OF_EVENT(numGroups, subscribeCounter, sentEventCounter).subscribeOn(Schedulers.newThread());
978 };
979
980 Observable<Event> SYNC_INFINITE_OBSERVABLE_OF_EVENT(final int numGroups, final AtomicInteger subscribeCounter, final AtomicInteger sentEventCounter) {
981 return Observable.create(new OnSubscribe<Event>() {
982
983 @Override
984 public void call(final Subscriber<? super Event> op) {
985 subscribeCounter.incrementAndGet();
986 int i = 0;
987 while (!op.isUnsubscribed()) {
988 i++;
989 Event e = new Event();
990 e.source = i % numGroups;
991 e.message = "Event-" + i;
992 op.onNext(e);
993 sentEventCounter.incrementAndGet();
994 }
995 op.onCompleted();
996 }
997
998 });
999 };
1000
1001 @Test
1002 public void testGroupByOnAsynchronousSourceAcceptsMultipleSubscriptions() throws InterruptedException {
1003
1004
1005 Observable<Long> source = Observable.interval(10, TimeUnit.MILLISECONDS).take(1);
1006
1007
1008 Observable<GroupedObservable<Boolean, Long>> stream = source.groupBy(IS_EVEN);
1009
1010
1011 @SuppressWarnings("unchecked")
1012 Observer<GroupedObservable<Boolean, Long>> o1 = mock(Observer.class);
1013 @SuppressWarnings("unchecked")
1014 Observer<GroupedObservable<Boolean, Long>> o2 = mock(Observer.class);
1015
1016
1017 stream.subscribe(o1);
1018 stream.subscribe(o2);
1019
1020
1021 verify(o1, never()).onError(Matchers.<Throwable> any());
1022 verify(o2, never()).onError(Matchers.<Throwable> any());
1023 }
1024
1025 private static Func1<Long, Boolean> IS_EVEN = new Func1<Long, Boolean>() {
1026
1027 @Override
1028 public Boolean call(Long n) {
1029 return n % 2 == 0;
1030 }
1031 };
1032
1033 private static Func1<Integer, Boolean> IS_EVEN2 = new Func1<Integer, Boolean>() {
1034
1035 @Override
1036 public Boolean call(Integer n) {
1037 return n % 2 == 0;
1038 }
1039 };
1040
1041 @Test
1042 public void testGroupByBackpressure() throws InterruptedException {
1043
1044 TestSubscriber<String> ts = new TestSubscriber<String>();
1045
1046 Observable.range(1, 4000)
1047 .groupBy(IS_EVEN2)
1048 .flatMap(new Func1<GroupedObservable<Boolean, Integer>, Observable<String>>() {
1049
1050 @Override
1051 public Observable<String> call(final GroupedObservable<Boolean, Integer> g) {
1052 return g.observeOn(Schedulers.computation()).map(new Func1<Integer, String>() {
1053
1054 @Override
1055 public String call(Integer l) {
1056 if (g.getKey()) {
1057 try {
1058 Thread.sleep(1);
1059 } catch (InterruptedException e) {
1060 }
1061 return l + " is even.";
1062 } else {
1063 return l + " is odd.";
1064 }
1065 }
1066
1067 });
1068 }
1069
1070 }).subscribe(ts);
1071 ts.awaitTerminalEvent();
1072 ts.assertNoErrors();
1073 }
1074
1075 <T, R> Func1<T, R> just(final R value) {
1076 return new Func1<T, R>() {
1077 @Override
1078 public R call(T t1) {
1079 return value;
1080 }
1081 };
1082 }
1083
1084 <T> Func1<Integer, T> fail(T dummy) {
1085 return new Func1<Integer, T>() {
1086 @Override
1087 public T call(Integer t1) {
1088 throw new RuntimeException("Forced failure");
1089 }
1090 };
1091 }
1092
1093 <T, R> Func1<T, R> fail2(R dummy2) {
1094 return new Func1<T, R>() {
1095 @Override
1096 public R call(T t1) {
1097 throw new RuntimeException("Forced failure");
1098 }
1099 };
1100 }
1101
1102 Func1<Integer, Integer> dbl = new Func1<Integer, Integer>() {
1103 @Override
1104 public Integer call(Integer t1) {
1105 return t1 * 2;
1106 }
1107 };
1108 Func1<Integer, Integer> identity = UtilityFunctions.identity();
1109
1110 @Before
1111 public void before() {
1112 MockitoAnnotations.initMocks(this);
1113 }
1114
1115 @Test
1116 public void normalBehavior() {
1117 Observable<String> source = Observable.from(Arrays.asList(
1118 " foo",
1119 " FoO ",
1120 "baR ",
1121 "foO ",
1122 " Baz ",
1123 " qux ",
1124 " bar",
1125 " BAR ",
1126 "FOO ",
1127 "baz ",
1128 " bAZ ",
1129 " fOo "
1130 ));
1131
1132
1133
1134
1135
1136
1137
1138
1139 Func1<String, String> keysel = new Func1<String, String>() {
1140 @Override
1141 public String call(String t1) {
1142 return t1.trim().toLowerCase();
1143 }
1144 };
1145 Func1<String, String> valuesel = new Func1<String, String>() {
1146 @Override
1147 public String call(String t1) {
1148 return t1 + t1;
1149 }
1150 };
1151
1152 Observable<String> m = source.groupBy(
1153 keysel, valuesel).flatMap(new Func1<GroupedObservable<String, String>, Observable<String>>() {
1154
1155 @Override
1156 public Observable<String> call(final GroupedObservable<String, String> g) {
1157 System.out.println("-----------> NEXT: " + g.getKey());
1158 return g.take(2).map(new Func1<String, String>() {
1159
1160 int count = 0;
1161
1162 @Override
1163 public String call(String v) {
1164 return g.getKey() + "-" + count++;
1165 }
1166
1167 });
1168 }
1169
1170 });
1171
1172 TestSubscriber<String> ts = new TestSubscriber<String>();
1173 m.subscribe(ts);
1174 ts.awaitTerminalEvent();
1175 System.out.println("ts .get " + ts.getOnNextEvents());
1176 ts.assertNoErrors();
1177 assertEquals(ts.getOnNextEvents(),
1178 Arrays.asList("foo-0", "foo-1", "bar-0", "foo-0", "baz-0", "qux-0", "bar-1", "bar-0", "foo-1", "baz-1", "baz-0", "foo-0"));
1179
1180 }
1181
1182 @Test
1183 public void keySelectorThrows() {
1184 Observable<Integer> source = Observable.just(0, 1, 2, 3, 4, 5, 6);
1185
1186 Observable<Integer> m = source.groupBy(fail(0), dbl).flatMap(FLATTEN_INTEGER);
1187
1188 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
1189 m.subscribe(ts);
1190 ts.awaitTerminalEvent();
1191 assertEquals(1, ts.getOnErrorEvents().size());
1192 assertEquals(0, ts.getOnNextEvents().size());
1193 }
1194
1195 @Test
1196 public void valueSelectorThrows() {
1197 Observable<Integer> source = Observable.just(0, 1, 2, 3, 4, 5, 6);
1198
1199 Observable<Integer> m = source.groupBy(identity, fail(0)).flatMap(FLATTEN_INTEGER);
1200 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
1201 m.subscribe(ts);
1202 ts.awaitTerminalEvent();
1203 assertEquals(1, ts.getOnErrorEvents().size());
1204 assertEquals(0, ts.getOnNextEvents().size());
1205
1206 }
1207
1208 @Test
1209 public void innerEscapeCompleted() {
1210 Observable<Integer> source = Observable.just(0);
1211
1212 Observable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
1213
1214 TestSubscriber<Object> ts = new TestSubscriber<Object>();
1215 m.subscribe(ts);
1216 ts.awaitTerminalEvent();
1217 ts.assertNoErrors();
1218 System.out.println(ts.getOnNextEvents());
1219 }
1220
1221
1222
1223
1224 @Test
1225 public void testExceptionIfSubscribeToChildMoreThanOnce() {
1226 Observable<Integer> source = Observable.just(0);
1227
1228 final AtomicReference<GroupedObservable<Integer, Integer>> inner = new AtomicReference<GroupedObservable<Integer, Integer>>();
1229
1230 Observable<GroupedObservable<Integer, Integer>> m = source.groupBy(identity, dbl);
1231
1232 m.subscribe(new Action1<GroupedObservable<Integer, Integer>>() {
1233 @Override
1234 public void call(GroupedObservable<Integer, Integer> t1) {
1235 inner.set(t1);
1236 }
1237 });
1238
1239 inner.get().subscribe();
1240
1241 @SuppressWarnings("unchecked")
1242 Observer<Integer> o2 = mock(Observer.class);
1243
1244 inner.get().subscribe(o2);
1245
1246 verify(o2, never()).onCompleted();
1247 verify(o2, never()).onNext(anyInt());
1248 verify(o2).onError(any(IllegalStateException.class));
1249 }
1250
1251 @Test
1252 public void testError2() {
1253 Observable<Integer> source = Observable.concat(Observable.just(0),
1254 Observable.<Integer> error(new TestException("Forced failure")));
1255
1256 Observable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
1257
1258 TestSubscriber<Object> ts = new TestSubscriber<Object>();
1259 m.subscribe(ts);
1260 ts.awaitTerminalEvent();
1261 assertEquals(1, ts.getOnErrorEvents().size());
1262 assertEquals(1, ts.getOnNextEvents().size());
1263 }
1264
1265 @Test
1266 public void testgroupByBackpressure() throws InterruptedException {
1267 TestSubscriber<String> ts = new TestSubscriber<String>();
1268
1269 Observable.range(1, 4000).groupBy(IS_EVEN2).flatMap(new Func1<GroupedObservable<Boolean, Integer>, Observable<String>>() {
1270
1271 @Override
1272 public Observable<String> call(final GroupedObservable<Boolean, Integer> g) {
1273 return g.doOnCompleted(new Action0() {
1274
1275 @Override
1276 public void call() {
1277 System.out.println("//////////////////// COMPLETED-A");
1278 }
1279
1280 }).observeOn(Schedulers.computation()).map(new Func1<Integer, String>() {
1281
1282 int c = 0;
1283
1284 @Override
1285 public String call(Integer l) {
1286 if (g.getKey()) {
1287 if (c++ < 400) {
1288 try {
1289 Thread.sleep(1);
1290 } catch (InterruptedException e) {
1291 }
1292 }
1293 return l + " is even.";
1294 } else {
1295 return l + " is odd.";
1296 }
1297 }
1298
1299 }).doOnCompleted(new Action0() {
1300
1301 @Override
1302 public void call() {
1303 System.out.println("//////////////////// COMPLETED-B");
1304 }
1305
1306 });
1307 }
1308
1309 }).doOnEach(new Action1<Notification<? super String>>() {
1310
1311 @Override
1312 public void call(Notification<? super String> t1) {
1313 System.out.println("NEXT: " + t1);
1314 }
1315
1316 }).subscribe(ts);
1317 ts.awaitTerminalEvent();
1318 ts.assertNoErrors();
1319 }
1320
1321 @Test
1322 public void testgroupByBackpressure2() throws InterruptedException {
1323
1324 TestSubscriber<String> ts = new TestSubscriber<String>();
1325
1326 Observable.range(1, 4000).groupBy(IS_EVEN2).flatMap(new Func1<GroupedObservable<Boolean, Integer>, Observable<String>>() {
1327
1328 @Override
1329 public Observable<String> call(final GroupedObservable<Boolean, Integer> g) {
1330 return g.take(2).observeOn(Schedulers.computation()).map(new Func1<Integer, String>() {
1331
1332 @Override
1333 public String call(Integer l) {
1334 if (g.getKey()) {
1335 try {
1336 Thread.sleep(1);
1337 } catch (InterruptedException e) {
1338 }
1339 return l + " is even.";
1340 } else {
1341 return l + " is odd.";
1342 }
1343 }
1344
1345 });
1346 }
1347
1348 }).subscribe(ts);
1349 ts.awaitTerminalEvent();
1350 ts.assertNoErrors();
1351 }
1352
1353 static Func1<GroupedObservable<Integer, Integer>, Observable<Integer>> FLATTEN_INTEGER = new Func1<GroupedObservable<Integer, Integer>, Observable<Integer>>() {
1354
1355 @Override
1356 public Observable<Integer> call(GroupedObservable<Integer, Integer> t) {
1357 return t;
1358 }
1359
1360 };
1361
1362 @Test
1363 public void testGroupByWithNullKey() {
1364 final String[] key = new String[]{"uninitialized"};
1365 final List<String> values = new ArrayList<String>();
1366 Observable.just("a", "b", "c").groupBy(new Func1<String, String>() {
1367
1368 @Override
1369 public String call(String value) {
1370 return null;
1371 }
1372 }).subscribe(new Action1<GroupedObservable<String, String>>() {
1373
1374 @Override
1375 public void call(GroupedObservable<String, String> groupedObservable) {
1376 key[0] = groupedObservable.getKey();
1377 groupedObservable.subscribe(new Action1<String>() {
1378
1379 @Override
1380 public void call(String s) {
1381 values.add(s);
1382 }
1383 });
1384 }
1385 });
1386 assertEquals(null, key[0]);
1387 assertEquals(Arrays.asList("a", "b", "c"), values);
1388 }
1389
1390 @Test
1391 public void testGroupByUnsubscribe() {
1392 final Subscription s = mock(Subscription.class);
1393 Observable<Integer> o = Observable.create(
1394 new OnSubscribe<Integer>() {
1395 @Override
1396 public void call(Subscriber<? super Integer> subscriber) {
1397 subscriber.add(s);
1398 }
1399 }
1400 );
1401 o.groupBy(new Func1<Integer, Integer>() {
1402
1403 @Override
1404 public Integer call(Integer integer) {
1405 return null;
1406 }
1407 }).subscribe().unsubscribe();
1408 verify(s).unsubscribe();
1409 }
1410
1411 @Test
1412 public void testGroupByShouldPropagateError() {
1413 final Throwable e = new RuntimeException("Oops");
1414 final TestSubscriber<Integer> inner1 = new TestSubscriber<Integer>();
1415 final TestSubscriber<Integer> inner2 = new TestSubscriber<Integer>();
1416
1417 final TestSubscriber<GroupedObservable<Integer, Integer>> outer
1418 = new TestSubscriber<GroupedObservable<Integer, Integer>>(new Subscriber<GroupedObservable<Integer, Integer>>() {
1419
1420 @Override
1421 public void onCompleted() {
1422 }
1423
1424 @Override
1425 public void onError(Throwable e) {
1426 }
1427
1428 @Override
1429 public void onNext(GroupedObservable<Integer, Integer> o) {
1430 if (o.getKey() == 0) {
1431 o.subscribe(inner1);
1432 } else {
1433 o.subscribe(inner2);
1434 }
1435 }
1436 });
1437 Observable.create(
1438 new OnSubscribe<Integer>() {
1439 @Override
1440 public void call(Subscriber<? super Integer> subscriber) {
1441 subscriber.onNext(0);
1442 subscriber.onNext(1);
1443 subscriber.onError(e);
1444 }
1445 }
1446 ).groupBy(new Func1<Integer, Integer>() {
1447
1448 @Override
1449 public Integer call(Integer i) {
1450 return i % 2;
1451 }
1452 }).subscribe(outer);
1453 assertEquals(Arrays.asList(e), outer.getOnErrorEvents());
1454 assertEquals(Arrays.asList(e), inner1.getOnErrorEvents());
1455 assertEquals(Arrays.asList(e), inner2.getOnErrorEvents());
1456 }
1457 }